{
   "cells": [
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "a89791bc-cfc2-4105-a541-a3392af3c314",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "## Feathr Feature Store For Customer360 on Azure - Demo Notebook"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "f4072c36-b190-4c8a-af43-dc004854aea4",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "This notebook illustrates the use of Feathr Feature Store to create one of the use case for Customer 360. This usecase predicts Sales amount by the Discount offered. It includes following steps:\n",
            " \n",
            "1. Install and set up Feathr with Azure\n",
            "2. Create shareable features with Feathr feature definition configs.\n",
            "3. Create a training dataset via point-in-time feature join.\n",
            "4. Compute and write features.\n",
            "5. Train a model using these features to predict Sales Amount.\n",
            "6. Materialize feature value to online store.\n",
            "7. Fetch feature value in real-time from online store for online scoring.\n",
            "\n",
            "\n",
            "The feature flow is as follows:\n",
            "![Feature Engineering](./Feature_engineering_c360.jpg)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "1632aaa6-35de-4d7f-9f88-ecfb1f927bb0",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Prerequisite: Provision cloud resources\n",
            "\n",
            "First step is to provision required cloud resources if you want to use Feathr. Feathr provides a python based client to interact with cloud resources.\n",
            "\n",
            "Please follow the steps [here](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html) to provision required cloud resources. Due to the complexity of the possible cloud environment, it is almost impossible to create a script that works for all the use cases. Because of this, [azure_resource_provision.sh](https://github.com/feathr-ai/feathr/blob/main/docs/how-to-guides/azure_resource_provision.sh) is a full end to end command line to create all the required resources, and you can tailor the script as needed, while [the companion documentation](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-cli.html) can be used as a complete guide for using that shell script.\n",
            "\n",
            "\n",
            "And the architecture is as below:\n",
            "\n",
            "![Architecture](https://github.com/feathr-ai/feathr/blob/main/docs/images/architecture.png?raw=true)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "80223a8e-8901-421c-b63d-4e11a6da5d88",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Sample Dataset\n",
            "\n",
            "In this demo, we use Feathr Feature Store to showcase Customer360 Features using Feathr. The dataset can be mounted onto a azure blob storage account and seen by executing the following command. The dataset is present in the current directory and it is referenced from [here](https://community.tableau.com/s/question/0D54T00000CWeX8SAL/sample-superstore-sales-excelxls)"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "d38f6dc4-51f7-44cd-a82d-cd08e08260e4",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "\n",
            "key = \"blobstorekey\"\n",
            "acnt = \"studiofeathrazuredevsto\"\n",
            "container = \"studio-feathrazure-dev-fs\"\n",
            "mntpnt = \"/mnt/studio-feathrazure-dev-fs\"\n",
            "\n",
            "def mountStorageContainer(storageAccount, storageAccountKey, storageContainer, blobMountPoint):\n",
            "    try:\n",
            "        print(\"Mounting {0} to {1}:\".format(storageContainer, blobMountPoint))\n",
            "        dbutils.fs.unmount(blobMountPoint)\n",
            "        \n",
            "    except Exception as e:\n",
            "        print(\"....Container is not mounted; Attempting mounting now..\")\n",
            "        \n",
            "    mountStatus = dbutils.fs.mount(source = \"wasbs://{0}@{1}.blob.core.windows.net/\".format(storageContainer, storageAccount),\n",
            "                  mount_point = blobMountPoint,\n",
            "                  extra_configs = {\"fs.azure.account.key.{0}.blob.core.windows.net\".format(storageAccount): storageAccountKey})\n",
            "    \n",
            "    print(\"....Status of mount is: \" + str(mountStatus))\n",
            "    print()\n",
            "\n",
            "    \n",
            "mountStorageContainer(acnt,key,container,mntpnt)\n",
            "\n",
            "\n",
            "df = spark.read.format(\"csv\").option(\"header\", \"true\").load(\"/mnt/studio-feathrazure-dev-fs/data/customer360.csv\")\n",
            "display(df)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "32261356-9c9e-4988-9754-ad6fc1c447e1",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Prerequisite: Install Feathr\n",
            "\n",
            "Install Feathr using pip:"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "5c988222-113b-49b2-8069-d5a44a9cb05b",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "%pip install --force-reinstall git+https://github.com/feathr-ai/feathr.git@registry_fix#subdirectory=feathr_project pandavro scikit-learn"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "1d87942f-db42-48cd-bf8f-f79c3214ce92",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Prerequisite: Configure the required environment\n",
            "\n",
            "In the first step (Provision cloud resources), you should have provisioned all the required cloud resources. If you use Feathr CLI to create a workspace, you should have a folder with a file called `feathr_config.yaml` in it with all the required configurations. Otherwise, update the configuration below.\n",
            "\n",
            "The code below will write this configuration string to a temporary location and load it to Feathr. Please still refer to [feathr_config.yaml](https://github.com/feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) and use that as the source of truth. It should also have more explanations on the meaning of each variable."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "571bc437-8a46-4f7f-83aa-2bf50e5c5cbb",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "import tempfile\n",
            "yaml_config = \"\"\"\n",
            "\n",
            "api_version: 1\n",
            "project_config:\n",
            "  project_name: 'customer360'\n",
            "  required_environment_variables:\n",
            "    - 'REDIS_PASSWORD'\n",
            "    - 'ADLS_ACCOUNT'\n",
            "    - 'ADLS_KEY'\n",
            "    - 'BLOB_ACCOUNT'\n",
            "    - 'BLOB_KEY'\n",
            "    - 'DATABRICKS_WORKSPACE_TOKEN_VALUE '\n",
            "    \n",
            "offline_store:\n",
            "  adls:\n",
            "    adls_enabled: true\n",
            "  wasb:\n",
            "    wasb_enabled: true\n",
            "  s3:\n",
            "    s3_enabled: false\n",
            "    s3_endpoint: 's3.amazonaws.com'\n",
            "  jdbc:\n",
            "    jdbc_enabled: false\n",
            "    jdbc_database: ''\n",
            "    jdbc_table: ''\n",
            "  snowflake:\n",
            "    snowflake_enabled: false\n",
            "    url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
            "    user: \"<replace_with_your_user>\"\n",
            "    role: \"<replace_with_your_user_role>\"\n",
            "    warehouse: \"<replace_with_your_warehouse>\"\n",
            "spark_config:\n",
            "  spark_cluster: 'databricks'\n",
            "  spark_result_output_parts: '1'\n",
            "  azure_synapse:\n",
            "    dev_url: 'https://feathrazure.dev.azuresynapse.net'\n",
            "    pool_name: 'spark3'\n",
            "    workspace_dir: 'abfss://container@blobaccountname.dfs.core.windows.net/demo_data1/'\n",
            "    executor_size: 'Small'\n",
            "    executor_num: 1\n",
            "  databricks:\n",
            "    workspace_instance_url: \"https://<replace_with_your_databricks_host>.azuredatabricks.net/\"\n",
            "    workspace_token_value: \"\"\n",
            "    config_template: '{\"run_name\":\"\",\"new_cluster\":{\"spark_version\":\"9.1.x-scala2.12\",\"node_type_id\":\"Standard_D3_v2\",\"num_workers\":2,\"spark_conf\":{}},\"libraries\":[{\"jar\":\"\"}],\"spark_jar_task\":{\"main_class_name\":\"\",\"parameters\":[\"\"]}}'\n",
            "    \n",
            "    work_dir: 'dbfs:/customer360'\n",
            "online_store:\n",
            "  redis:\n",
            "    host: '<replace_with_your_redis>.redis.cache.windows.net'\n",
            "    port: 6380\n",
            "    ssl_enabled: True\n",
            "feature_registry:\n",
            "  api_endpoint: \"https://<replace_with_your_api_endpoint>.azurewebsites.net/api/v1\"\n",
            "\"\"\"\n",
            "# write this configuration string to a temporary location and load it to Feathr\n",
            "tmp = tempfile.NamedTemporaryFile(mode='w', delete=False)\n",
            "with open(tmp.name, \"w\") as text_file:\n",
            "    text_file.write(yaml_config)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "b37406db-23a6-40c4-966f-ccc0f8a3c853",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Import necessary libraries"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "3a5438fa-42fa-40eb-9a4e-d24ac68f9042",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "import glob\n",
            "import os\n",
            "import tempfile\n",
            "from datetime import datetime, timedelta\n",
            "from math import sqrt\n",
            "import pandas as pd\n",
            "import pandavro as pdx\n",
            "from feathr import FeathrClient\n",
            "from feathr import BOOLEAN, FLOAT, INT32, ValueType,STRING\n",
            "from feathr import Feature, DerivedFeature, FeatureAnchor\n",
            "from feathr import FeatureAnchor\n",
            "from feathr.client import FeathrClient\n",
            "from feathr import DerivedFeature\n",
            "from feathr import BackfillTime, MaterializationSettings\n",
            "from feathr import FeatureQuery, ObservationSettings\n",
            "from feathr import RedisSink\n",
            "from feathr import INPUT_CONTEXT, HdfsSource\n",
            "from feathr import WindowAggTransformation\n",
            "from feathr import TypedKey\n",
            "from sklearn.metrics import mean_squared_error\n",
            "from sklearn.model_selection import train_test_split"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "3bda4f77-8418-460f-83ad-bb442f9a0525",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Setup necessary environment variables\n",
            "\n",
            "You have to setup the environment variables in order to run this sample. More environment variables can be set by referring to [feathr_config.yaml](https://github.com/feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) and use that as the source of truth. It should also have more explanations on the meaning of each variable."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "041041b0-ac69-4ab5-a993-509471bf334c",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "import os\n",
            "os.environ['REDIS_PASSWORD'] = ''\n",
            "os.environ['ADLS_ACCOUNT'] = ''\n",
            "os.environ['ADLS_KEY'] = ''\n",
            "os.environ['BLOB_ACCOUNT'] = \"\"\n",
            "os.environ['BLOB_KEY'] = ''\n",
            "os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = ''"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "ec09a17d-ec64-4b9f-999f-9a71a508eaed",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Initialize a feathr client"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "88aec1c1-2bdc-42d2-918d-48c0e28fdd0f",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "client = FeathrClient(config_path=tmp.name)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "64f272f0-7008-4de7-89fe-a9d32f5573a0",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Define Sources Section\n",
            "A feature source is needed for anchored features that describes the raw data in which the feature values are computed from. See the python documentation to get the details on each input column."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "6162dfd7-0791-4e9b-8200-da7710272c1e",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "batch_source = HdfsSource(name=\"cosmos_final_data\",\n",
            "                          path=\"abfss://container@blobaccountname.dfs.core.windows.net/data/customer360.csv\",\n",
            "                          event_timestamp_column=\"sales_order_dt\",\n",
            "                          timestamp_format=\"yyyy-MM-dd\")"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "9d08913f-e416-46e3-9bf3-31f50e41139f",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Defining Features with Feathr:\n",
            "In Feathr, a feature is viewed as a function, mapping from entity id or key, and timestamp to a feature value."
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "aa303679-7be2-430b-8194-19c90a28c4af",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Define Anchors and  Features\n",
            "A feature is called an anchored feature when the feature is directly extracted from the source data, rather than computed on top of other features."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "9d95d006-8d9a-4e63-b7b0-2c88a166c6cb",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "f_sales_cust_id = Feature(name = \"f_sales_cust_id\",\n",
            "                          feature_type = STRING, transform = \"sales_cust_id\" )\n",
            "\n",
            "f_sales_tran_id = Feature(name = \"f_sales_tran_id\",\n",
            "                          feature_type = STRING, transform = \"sales_tran_id\" )\n",
            "\n",
            "f_sales_order_id = Feature(name = \"f_sales_order_id\",\n",
            "                           feature_type = STRING, transform = \"sales_order_id\" )\n",
            "\n",
            "f_sales_item_quantity = Feature(name = \"f_sales_item_quantity\", \n",
            "                                feature_type = INT32, transform = \"cast_float(sales_item_quantity)\" )\n",
            "\n",
            "f_sales_order_dt = Feature(name = \"f_sales_order_dt\",\n",
            "                           feature_type = STRING, transform = \"sales_order_dt\" )\n",
            "\n",
            "f_sales_sell_price = Feature(name = \"f_sales_sell_price\",\n",
            "                             feature_type = INT32, transform = \"cast_float(sales_sell_price)\" )\n",
            "\n",
            "f_sales_discount_amt = Feature(name = \"f_sales_discount_amt\",\n",
            "                               feature_type = INT32, transform = \"cast_float(sales_discount_amt)\" )\n",
            "\n",
            "f_payment_preference = Feature(name = \"f_payment_preference\",\n",
            "                               feature_type = STRING, transform = \"payment_preference\" )\n",
            "\n",
            "\n",
            "features = [f_sales_cust_id, f_sales_tran_id, f_sales_order_id, f_sales_item_quantity, \n",
            "            f_sales_order_dt, f_sales_sell_price, f_sales_discount_amt, f_payment_preference]\n",
            "\n",
            "request_anchor = FeatureAnchor(name=\"request_features\",\n",
            "                                source=INPUT_CONTEXT,\n",
            "                                features=features)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "b84cebbe-884f-4665-9df7-dc3a16037fc5",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Define Derived Features\n",
            "Derived features are the features that are computed from other features. They could be computed from anchored features, or other derived features."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "d620ae76-b8ed-4bfe-a0dd-2e50ffd79212",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "f_total_sales_amount = DerivedFeature(name = \"f_total_sales_amount\",\n",
            "                                   feature_type = FLOAT,\n",
            "                                   input_features = [f_sales_item_quantity,f_sales_sell_price],\n",
            "                                   transform = \"f_sales_item_quantity * f_sales_sell_price\")\n",
            "\n",
            "f_total_sales_discount= DerivedFeature(name = \"f_total_sales_discount\",\n",
            "                                   feature_type = FLOAT,\n",
            "                                   input_features = [f_sales_item_quantity,f_sales_discount_amt],\n",
            "                                   transform = \"f_sales_item_quantity * f_sales_discount_amt\")\n",
            "\n",
            "\n",
            "f_total_amount_paid= DerivedFeature(name = \"f_total_amount_paid\",\n",
            "                                   feature_type = FLOAT,\n",
            "                                   input_features = [f_sales_sell_price,f_sales_discount_amt],\n",
            "                                   transform =\"f_sales_sell_price - f_sales_discount_amt\")"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "3230211b-c978-44d3-9996-edd53fa952f0",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Define Aggregate features and anchor the features to batch source.\n",
            "\n",
            "Note that if the data source is from the observation data, the source section should be INPUT_CONTEXT to indicate the source of those defined anchors."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "ef204054-6638-4ff5-ba46-330256f553ed",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "customer_ID = TypedKey(key_column=\"sales_cust_id\",\n",
            "                       key_column_type=ValueType.INT32,\n",
            "                       description=\"customer ID\",\n",
            "                       full_name=\"cosmos.sales_cust_id\")\n",
            "\n",
            "agg_features = [Feature(name=\"f_avg_customer_sales_amount\",\n",
            "                        key=customer_ID,\n",
            "                        feature_type=FLOAT,\n",
            "                        transform=WindowAggTransformation(agg_expr=\"cast_float(sales_sell_price)\",\n",
            "                                                          agg_func=\"AVG\",\n",
            "                                                          window=\"1d\")),\n",
            "               \n",
            "               Feature(name=\"f_avg_customer_discount_amount\",\n",
            "                        key=customer_ID,\n",
            "                        feature_type=FLOAT,\n",
            "                        transform=WindowAggTransformation(agg_expr=\"cast_float(sales_discount_amt)\",\n",
            "                                                          agg_func=\"AVG\",\n",
            "                                                          window=\"1d\")),\n",
            "               \n",
            "              Feature(name=\"f_avg_item_ordered_by_customer\",\n",
            "                        key=customer_ID,\n",
            "                        feature_type=FLOAT,\n",
            "                        transform=WindowAggTransformation(agg_expr=\"cast_float(sales_item_quantity)\",\n",
            "                                                          agg_func=\"AVG\",\n",
            "                                                          window=\"1d\"))]\n",
            "\n",
            "agg_anchor = FeatureAnchor(name=\"aggregationFeatures\",\n",
            "                           source=batch_source,\n",
            "                           features=agg_features)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "1e803a63-80b6-40ad-9419-422ca1db3d97",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Building Features\n",
            "And then we need to build those features so that it can be consumed later. Note that we have to build both the \"anchor\" and the \"derived\" features (which is not anchored to a source)."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "949a79bd-e4e9-487a-9a5c-b04cdecba3b3",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "client.build_features(anchor_list=[request_anchor,agg_anchor], derived_feature_list=[f_total_sales_amount, f_total_sales_discount,f_total_amount_paid])"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "684aef42-53e1-4548-b604-9a581abda253",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Registering Features\n",
            "We can also register the features with an Apache Atlas compatible service, such as Azure Purview, and share the registered features across teams:"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "c486ea65-9ee9-4f73-aa61-33873ade8fae",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "client.register_features()"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "00f5b4d9-5054-4a53-a511-2b8380c08ef5",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "client.list_registered_features(project_name=\"customer360\")"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "30e09585-3917-4d3b-8681-15360ad74972",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Create training data using point-in-time correct feature join\n",
            "A training dataset usually contains entity id columns, multiple feature columns, event timestamp column and label/target column.\n",
            "\n",
            "To create a training dataset using Feathr, one needs to provide a feature join configuration file to specify what features and how these features should be joined to the observation data."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "2c2baf2c-835f-4aa9-8e01-b7c0e8711081",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "feature_query = FeatureQuery(\n",
            "    feature_list=[\"f_avg_item_ordered_by_customer\",\"f_avg_customer_discount_amount\",\"f_avg_customer_sales_amount\",\"f_total_sales_discount\"], key=customer_ID)\n",
            "settings = ObservationSettings(\n",
            "    observation_path=\"abfss://container@blobaccountname.dfs.core.windows.net/data/customer360.csv\",\n",
            "    event_timestamp_column=\"sales_order_dt\",\n",
            "    timestamp_format=\"yyyy-MM-dd\")"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "8cf3eb6a-d014-429f-9f57-28aa2870785d",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Materialize feature value into offline storage\n",
            "While Feathr can compute the feature value from the feature definition on-the-fly at request time, it can also pre-compute and materialize the feature value to offline and/or online storage."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "f1209ef3-f865-44fd-8721-14c6fa131d1b",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "client.get_offline_features(observation_settings=settings,\n",
            "                            feature_query=feature_query,\n",
            "                            output_path=\"abfss://container@blobaccountname.dfs.core.windows.net/data/output/output.avro\")\n",
            "client.wait_job_to_finish(timeout_sec=500)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "b54f11fe-0a2e-4223-9090-68b12d3b3fb4",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Reading training data from offline storage"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "f805989c-d7f6-43e5-bd3c-0299c2f1beb7",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "path = '/mnt/studio-feathrazure-dev-fs/cosmos/output/output'\n",
            "df= spark.read.format(\"avro\").load(path)\n",
            "\n",
            "df = df.toPandas()\n",
            "display(df)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "7e398ee6-e2eb-4cf6-90b1-a71dc693a2c0",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "####Train a ML model\n",
            "\n",
            "After getting all the features, let's train a machine learning model with the converted feature by Feathr:"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "a6948c79-0b06-41a7-8df0-4332d40a5b8a",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "X = df['f_total_sales_discount']\n",
            "y = df['f_total_sales_amount']\n",
            "\n",
            "X_train, X_test, y_train, y_test = train_test_split(X, y, train_size = 0.7, test_size = 0.3, random_state = 100)\n",
            "\n",
            "# Add a constant to get an intercept\n",
            "X_train_sm = sm.add_constant(X_train)\n",
            "\n",
            "# Fit the resgression line using 'OLS'\n",
            "lr = sm.OLS(y_train, X_train_sm).fit()\n",
            "\n",
            "# Add a constant to X_test\n",
            "X_test_sm = sm.add_constant(X_test)\n",
            "\n",
            "# Predict the y values corresponding to X_test_sm\n",
            "y_pred = lr.predict(X_test_sm)\n",
            "\n",
            "# Checking the R-squared on the test set\n",
            "\n",
            "r_squared = r2_score(y_test, y_pred)\n",
            "r_squared\n",
            "\n",
            "\n",
            "print(\"Model MAPE:\")\n",
            "print(1 - r_squared)\n",
            "print()\n",
            "print(\"Model Accuracy:\")\n",
            "print(r_squared)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "69cb5413-7327-4e64-81d3-f76010a6af52",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Materialize feature value into online storage\n",
            "We can push the generated features to the online store like below"
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "706dcf1b-64d1-47d0-8bbe-88c8af82a464",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "\n",
            "redisSink = RedisSink(table_name=\"Customer360\")\n",
            "settings = MaterializationSettings(\"cosmos_feathr_table\",\n",
            "                                   sinks=[redisSink],\n",
            "                                   feature_names=[\"f_avg_item_ordered_by_customer\",\"f_avg_customer_discount_amount\"])\n",
            "\n",
            "client.materialize_features(settings, allow_materialize_non_agg_feature =True)\n",
            "client.wait_job_to_finish(timeout_sec=500)"
         ]
      },
      {
         "cell_type": "markdown",
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "6f6cba7c-255b-4713-8c0b-023bdb4c2c55",
               "showTitle": false,
               "title": ""
            }
         },
         "source": [
            "#### Fetching feature value for online inference\n",
            "For features that are already materialized by the previous step, their latest value can be queried via the client's get_online_features or multi_get_online_features API."
         ]
      },
      {
         "cell_type": "code",
         "execution_count": null,
         "metadata": {
            "application/vnd.databricks.v1+cell": {
               "inputWidgets": {},
               "nuid": "40409c79-79fc-400e-a32b-fce3bdc682e6",
               "showTitle": false,
               "title": ""
            }
         },
         "outputs": [],
         "source": [
            "client.get_online_features(feature_table = \"Customer360\",\n",
            "                           key = \"KB-16240\",\n",
            "                           feature_names = ['f_avg_item_ordered_by_customer'])"
         ]
      }
   ],
   "metadata": {
      "application/vnd.databricks.v1+notebook": {
         "dashboards": [],
         "language": "python",
         "notebookMetadata": {
            "pythonIndentUnit": 4
         },
         "notebookName": "Customer360_MS_V2",
         "notebookOrigID": 2897062443582288,
         "widgets": {}
      },
      "language_info": {
         "name": "python"
      }
   },
   "nbformat": 4,
   "nbformat_minor": 0
}
